/*
 * CRtpMediaChannel.cpp
 *
 *  Created on: 2017年4月6日
 *      Author: chuanjiang.zh
 */

#include "CRtpMediaChannel.h"
#include "RtpHeader.h"
#include "RtpConst.h"
#include "H264RtpPackager.h"
#include "CRtpPackager.h"
#include "PcmRtpPackager.h"

#include "H264NaluParser.h"
#include "RtpPort.h"
#include <algorithm>
#include "RtpHeader.h"
#include "TFileUtil.h"
#include "TStringUtil.h"
#include "JRTPSession.h"

#include "CLog.h"
#include "RtpUrl.h"


static const size_t THREAD_STACK_SIZE = 2 * 1024 * 1024;


namespace av
{

CRtpMediaChannel::CRtpMediaChannel():
		m_maxPktSize(RTP_MAX_SIZE),
		m_payload(DYNAMIC_PAYLOAD),
		m_mediaType(MEDIA_TYPE_NONE),
		m_packetCount(),
		m_lastTime(),
		m_recvEnabled(true)
{
	setStackSize(THREAD_STACK_SIZE);
}

CRtpMediaChannel::~CRtpMediaChannel()
{
	close();
}

int CRtpMediaChannel::open(const std::string& url, const std::string& params)
{
	std::string ip;
	int port = 0;
	RtpUrl::parse(url, ip, port);

	if (port == 0)
	{
		port = RtpPort::make();
	}

	for (size_t i = 0; i < RtpPort::count(); ++ i)
	{
		if (openRtpSession(port))
		{
			break;
		}
		else
		{
			port = RtpPort::make();
		}
	}

	if (!m_session)
	{
		return ENOENT;
	}

	m_localAddr.set(ip, port);

	start();

	return 0;
}

void CRtpMediaChannel::close()
{
	stop();

	closeRtpSession();

	clearCache();
}

bool CRtpMediaChannel::isOpen()
{
	return (m_session != NULL);
}

std::string CRtpMediaChannel::getUrl()
{
	return m_url;
}

int CRtpMediaChannel::startStream()
{
	setState(STATE_PLAYING);
	return 0;
}

int CRtpMediaChannel::pauseStream()
{
	setState(STATE_PAUSED);
	return 0;
}

void CRtpMediaChannel::stopStream()
{
	setState(STATE_STOPPED);
}

int CRtpMediaChannel::getState()
{
	return m_state;
}

int CRtpMediaChannel::forceKeyFrame()
{
	// pass
	return 0;
}


void CRtpMediaChannel::onMediaFormat(const MediaFormat& fmt)
{
	int codec = (m_mediaType == MEDIA_TYPE_VIDEO) ? fmt.m_codec : fmt.m_audioCodec;

	if (codec == MEDIA_CODEC_H264)
	{
		m_packager.reset(new H264RtpPackager());
	}
	else if (codec == MEDIA_CODEC_G711U)
	{
		m_packager.reset(new PcmRtpPackager());
	}
	else if (codec == MEDIA_CODEC_G711A)
	{
		m_packager.reset(new PcmRtpPackager());
	}
	else
	{
		m_packager.reset(new CRtpPackager());
	}
}

void CRtpMediaChannel::onMediaPacket(MediaPacketPtr& pkt)
{
    if (pkt->empty())
    {
        return;
    }

    if (pkt->type() != m_mediaType)
    {
    	return;
    }

    m_pktQueue.push(pkt);

    if (m_mediaType == av::MEDIA_TYPE_AUDIO)
    {
    	//CLog::debug("media packet. size:%d, pts:%I64d\n", pkt->size(), pkt->pts());
    	//comn::FileUtil::write(pkt->data(), pkt->size(), "out.pcmu", true);
    }

    //comn::FileUtil::write(pkt->data(), pkt->size(), "chl.h264", true);

	//sendPacket(pkt);
}


void CRtpMediaChannel::onMediaEvent(int event)
{
	// pass
}


std::string CRtpMediaChannel::getName()
{
	return m_url;
}


NetAddress CRtpMediaChannel::getLocalAddr()
{
	return m_localAddr;
}


void CRtpMediaChannel::setMaxPacetSize(size_t size)
{
	if ((size > 0) && (size <= RTP_MAX_SIZE))
	{
		m_maxPktSize = size;
	}
}


void CRtpMediaChannel::setPayload(uint8_t payload)
{
	m_payload = payload;
}


uint8_t CRtpMediaChannel::getPayload()
{
	return m_payload;
}

void CRtpMediaChannel::setMediaType(int mediaType)
{
	m_mediaType = mediaType;
}

int CRtpMediaChannel::getMediaType()
{
	return m_mediaType;
}

void CRtpMediaChannel::setPeerAddr(const std::string& ip, int port)
{
	CLog::info("setPeerAddr %s:%d\n", ip.c_str(), port);

	if (port > 0)
	{
		addDestination(ip, port);
	}
	else
	{
		removeDestination(m_peerAddr.m_ip, m_peerAddr.m_port);
	}

	m_peerAddr.set(ip, port);
}

NetAddress CRtpMediaChannel::getPeerAddr()
{
	return m_peerAddr;
}


void CRtpMediaChannel::setPeerMedium(int payload, int codec, int clockrate, const std::string& sprop)
{
	if (m_mediaType == av::MEDIA_TYPE_AUDIO)
	{
		m_format.m_audioCodec = (av::MediaCodec)codec;
		m_format.m_channels = 1;
		m_format.m_sampleRate = 8000;
	}
}

int CRtpMediaChannel::run()
{
	while (!m_canExit)
	{
        poll(0);

        MediaPacketPtr pkt;
        if (!m_pktQueue.pop(pkt, 500))
        {
            continue;
        }

        sendPacket(pkt);
	}
	return 0;
}

void CRtpMediaChannel::doStop()
{
	m_pktQueue.cancelWait();
}


void CRtpMediaChannel::onSlicePacket(const RtpPacket& pkt)
{
	sendPacket(pkt);
}


bool CRtpMediaChannel::openRtpSession(uint16_t port)
{
    m_session.reset(new JRTPSession(NULL));

    RTPUDPv4TransmissionParams transparams;
    RTPSessionParams sessparams;
    sessparams.SetOwnTimestampUnit(1.0/90000.0);
    sessparams.SetAcceptOwnPackets(true);
    sessparams.SetUsePollThread(false);

    transparams.SetRTPReceiveBuffer(1024 * 1024 * 4);
    transparams.SetRTPSendBuffer(1024 * 1024 * 4);

    transparams.SetPortbase(port);
    int ret = m_session->Create(sessparams, &transparams);
    if (ret != 0)
    {
        m_session.reset();
    }
    else
    {
        //m_session->SetDefaultPayloadType(m_payload);
        m_session->SetMaximumPacketSize(RTP_MAX_SIZE + sizeof(RTP_FIXED_HEADER)*2);
        m_session->SetDefaultTimestampIncrement(0);
        m_session->SetDefaultMark(false);
    }
    return (ret == 0);
}

void CRtpMediaChannel::closeRtpSession()
{
    if (m_session)
    {
    	RTPTime t(1, 0);
        std::string reason("session closed");
        m_session->BYEDestroy(t, reason.c_str(), reason.size());
        m_session.reset();
    }
}

void CRtpMediaChannel::clearCache()
{
    m_pktQueue.clear();
}

void CRtpMediaChannel::sendPacket(MediaPacketPtr& packet)
{
	if (!packet)
	{
		return;
	}

	MediaPacket& pkt = *packet;
    if (pkt.empty())
    {
        return;
    }

	m_packager->slice(pkt, RTP_MAX_SIZE, this);
}

void CRtpMediaChannel::sendPacket(const RtpPacket& pkt)
{
	//if (m_mediaType == av::MEDIA_TYPE_AUDIO)
	//{
	//	CLog::debug("sendPacket. size:%d, pts:%u\n", pkt.size, pkt.ts);
	//}

    if ((pkt.size == 4) && (pkt.data[0] == 0x09))
    {
        return;
    }

    uint32_t offset = 0;
    bool mark = pkt.mark;

    if (m_packetCount <= 0)
    {
        m_lastTime = pkt.ts;
        //m_duration = 0;
    }

	m_packetCount ++;

    if (mark || (m_mediaType == av::MEDIA_TYPE_AUDIO))
    {
        offset = pkt.ts - m_lastTime;
        m_lastTime = pkt.ts;
    }

	int rc = m_session->SendPacket(pkt.data, pkt.size, m_payload, mark, offset);
    if (rc != 0)
    {
        CLog::error("failed to send packet. size:%d\n", pkt.size);
    }

    //int step = (m_mediaType == av::MEDIA_TYPE_VIDEO) ? 5 : 1;
    //if (m_packetCount % step == 0)
    //{
    //    timedwait(5);
    //}
}

bool CRtpMediaChannel::poll(int ms)
{
    m_session->Poll();

    readAndHandle();

    return true;
}


bool CRtpMediaChannel::readAndHandle()
{
    bool dataAvailable = true;
    //RTPTime timeout(0, 0);
    //m_session->WaitForIncomingData(timeout, &dataAvailable);

    //if (dataAvailable == false)
    //{
    //    return false;
    //}

    m_session->BeginDataAccess();

    bool working = m_session->GotoFirstSourceWithData();
	//CLog::debug("%s GotoFirstSourceWithData. %d\n", CLog::getShortTime(), working);

    while (working && !m_canExit)
    {
        //process packet
        RTPPacket* pPacket = NULL;
        pPacket = m_session->GetNextPacket();
        while (pPacket)
        {
            onRtpPacket(pPacket);

            if (m_canExit)
            {
                break;
            }

            pPacket = m_session->GetNextPacket();
        }

        working = m_session->GotoNextSourceWithData();
    }

    m_session->EndDataAccess();

    return true;
}

void CRtpMediaChannel::onRtpPacket(jrtplib::RTPPacket* pPacket)
{
	uint8_t* data = pPacket->GetPayloadData();
	size_t size = pPacket->GetPayloadLength();
	uint32_t pts = pPacket->GetTimestamp();

	//CLog::debug("%s onRtpPacket. size:%d, pts:%u\n", CLog::getShortTime(), size, pts);

	//comn::FileUtil::write(data, size, "in.pcmu", true);
	if (m_mediaType == av::MEDIA_TYPE_AUDIO)
	{
		if (m_recvEnabled)
		{
			av::MediaPacketPtr pkt(new av::MediaPacket());
			pkt->set_type(m_mediaType);
			pkt->copy(data, size);
			pkt->set_pts(pts);
			pkt->set_dts(pts);

			fireMediaPacket(pkt);
		}
	}

    m_session->DeletePacket(pPacket);
}

void CRtpMediaChannel::addDestination(const std::string& ip, int port)
{
	if (!m_session)
	{
		return;
	}

	uint32_t ipAddr = inet_addr(ip.c_str());
	ipAddr = ntohl(ipAddr);
	RTPIPv4Address dest(ipAddr, port);
	m_session->AddDestination(dest);
}

void CRtpMediaChannel::removeDestination(const std::string& ip, int port)
{
	if (!m_session)
	{
		return;
	}

	uint32_t ipAddr = inet_addr(m_peerAddr.m_ip.c_str());
	ipAddr = ntohl(ipAddr);
	RTPIPv4Address dest(ipAddr, m_peerAddr.m_port);
	m_session->DeleteDestination(dest);
}

void CRtpMediaChannel::timedwait(int ms)
{
	RTPTime delay(ms/1000, (ms % 1000) * 1000);
	
	m_session->WaitForIncomingData(delay, NULL);
}

void CRtpMediaChannel::enableRecv(bool enabled)
{
	m_recvEnabled = enabled;
}


} /* namespace av */
